iT邦幫忙

2025 iThome 鐵人賽

DAY 12
0

Tokio task 的通信

通常來說,對於允許併發多執行分支的內核或引擎來說,都需要提供對應的通信機制和同步機制。

例如,多進程之間,有進程間通信方式,比如管道、套接字、共享內存、消息隊列等,還有進程間的同步機制,例如信號量、文件鎖、條件變量等。多線程之間,也有線程間通信方式,簡單粗暴的是直接共享同進程內存,同步機制則有互斥鎖、條件變量等。

tokio提供了異步多任務的併發能力,它也需要提供異步任務之間的通信方式和同步機制。

分為底下幾種通道

oneshot 通道

就像告知狀態,oneshot是一對一的通知

範例如下:

use tokio::sync::oneshot;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    // 撮合引擎 task 等待風控訊號
    tokio::spawn(async move {
        println!("撮合引擎等待風控通知...");
        match rx.await {
            Ok(msg) => println!("撮合引擎收到風控通知: {}", msg),
            Err(_) => println!("風控通道被關閉,無法收到通知"),
        }
    });

    // 模擬外部風控邏輯
    tokio::spawn(async move {
        // 模擬風控檢查花 3 秒
        tokio::time::sleep(Duration::from_secs(3)).await;

        let condition = true; // 可以換成 false 試試不送的狀況
        process_risk_check(tx, condition);
    }).await.unwrap(); // 等待風控執行完
}

// 外部邏輯做條件判斷
fn process_risk_check(tx: oneshot::Sender<&'static str>, condition: bool) {
    if condition {
        let _ = tx.send("開始撮合");
    } else {
        println!("風控未通過,不送出撮合指令");
        // 如果沒 send,撮合 task 就會一直等住
    }
}

在這裡 match 需要寫在 spawn 外面 主程式的其他 function 才可以正常執行

如果將上面改為 false 執行 結果如同底下

https://ithelp.ithome.com.tw/upload/images/20250816/20177999piAdIWWne5.png

mpsc

就像是使用不同裝置下單,是多對一的狀態

範例如下

use tokio::sync::mpsc;
use std::time::Duration;

#[tokio::main]
async fn main() {
    // 建立 channel,容量為 10
    let (tx, mut rx) = mpsc::channel::<String>(10);

    // 撮合引擎 task:負責接收所有訂單
    tokio::spawn(async move {
        while let Some(order) = rx.recv().await {
            println!("撮合引擎收到訂單:{}", order);
        }
    });

    // 模擬三個下單來源:API、手機、網頁
    let tx_api = tx.clone();
    let tx_mobile = tx.clone();

    tokio::spawn(async move {
        tokio::time::sleep(Duration::from_secs(1)).await;
        let _ = tx_api.send("API 下單: 買 BTC 1 張".to_string()).await;
    });

    tokio::spawn(async move {
        tokio::time::sleep(Duration::from_secs(1)).await;
        let _ = tx_mobile.send("手機下單: 賣 ETH 2 張".to_string()).await;
    });

    tokio::time::sleep(Duration::from_secs(1)).await;
    let _ = tx.send("網頁下單: 買 DOGE 100 張".to_string()).await;

}

實際執行後的範例如下

https://ithelp.ithome.com.tw/upload/images/20250816/20177999SGEDOxaPBW.png

可以看到接受的順序是未知的

broadcast

一對多的傳送訊息

這裡舉一個跨檔案的例子

use tokio::sync::broadcast;
use tokio::time::{sleep, Duration};

pub async fn run_ui_listener(name: &str, mut rx: broadcast::Receiver<&'static str>) {
    let name = name.to_string();

    tokio::spawn(async move {
        while let Ok(msg) = rx.recv().await {
            println!("[{}] 收到交易所通知:{}", name, msg);
        }
    });

    // 模擬 UI 正在運行
    sleep(Duration::from_secs(1)).await;
}
mod ui;

use tokio::sync::broadcast;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (tx, _) = broadcast::channel::<&'static str>(16);

    // 啟動三個 UI 接收端
    ui::run_ui_listener("Web UI", tx.subscribe()).await;
    ui::run_ui_listener("Mobile App", tx.subscribe()).await;
    ui::run_ui_listener("API Client", tx.subscribe()).await;

    // 等待所有訂閱者啟動
    sleep(Duration::from_secs(1)).await;

    // 廣播一條系統通知
    let _ = tx.send("系統將於 12:00 暫停交易");

    // 等待訊息印出
    sleep(Duration::from_secs(1)).await;
}

實際的輸出結果就像

https://ithelp.ithome.com.tw/upload/images/20250816/20177999Qqj1z7p82t.png

watch

同樣是一對多,但和 broadcast 不一樣的是 watch 永遠只保存一個數據,所以只有最新的狀態

範例如下

use tokio::sync::watch;
use tokio::time::{sleep, Duration};

pub async fn run_component(name: &str, mut rx: watch::Receiver<&'static str>) {
    let name = name.to_string();

    tokio::spawn(async move {
        loop {
            // 等待狀態改變
            if rx.changed().await.is_ok() {
                let state = *rx.borrow();
                println!("[{}] 市場狀態更新為:{}", name, state);

                // 模擬收到關閉就停止操作
                if state == "closed" {
                    println!("[{}] 停止操作", name);
                    break;
                }
            }
        }
    });
}
mod module;

use tokio::sync::watch;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // 建立 watch channel,初始狀態為「尚未開盤」
    let (tx, rx) = watch::channel("pending");

    // 啟動三個模組去監聽市場狀態
    module::run_component("前台 Web", rx.clone()).await;
    module::run_component("撮合引擎", rx.clone()).await;
    module::run_component("API Gateway", rx.clone()).await;

    // 模擬一秒後開盤
    sleep(Duration::from_secs(1)).await;
    let _ = tx.send("open");

    // 模擬再過兩秒收盤
    sleep(Duration::from_secs(2)).await;
    let _ = tx.send("closed");

    // 等一點時間讓所有模組印出
    sleep(Duration::from_secs(2)).await;
}

實際的運行結果如下

https://ithelp.ithome.com.tw/upload/images/20250816/20177999JyJO5Z7kO5.png


上一篇
【Day11】- Rust(Tokio)
下一篇
【Day13】- Rust(專案管理)
系列文
NautilusTrader 架構解析:Rust 在高效能量化交易平台中的角色與優勢22
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言